/*******************************************************************************
 * Package: com.song.main
 * Type:    TestMap
 * Date:    2024-11-21 19:00
 *
 * Copyright (c) 2024 LTD All Rights Reserved.
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.visit;

import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * 函数式编程 完善说明
 *
 * @author Songxianyang
 * @date 2024-11-21 19:00
 */
public class UserVisitMap2 {

    public static void main(String[] args) {
        // 1.创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("yqmm-spark/file/user_visit_action.txt");
        // 行 过滤掉无用数据  （搜索） 将数据扁平化
        lines.filter(row -> {

                    String[] s = row.split("_");
                    return "null".equals(s[5]);
                }
        )
                // 找到点击 下单 和支付 映射成新的对象UserVisitDto
                .flatMap(row -> {
                    // 不等于-1就是点击
                    String[] s = row.split("_");
                    if (!StrUtil.equals("-1", s[6])) {
                        // 点击 (品类ID，1，  0，      0)
                        return Lists.newArrayList(new UserVisitDto(s[6], 1L, 0L, 0L)).iterator();
                    }

                    if (!StrUtil.equals("null", s[8])) {
                        // 下单 (品类ID，0，  1，      0)
                        String[] split = s[8].split(",");
                        List<UserVisitDto> list = new ArrayList<>();
                        for (String s1 : split) {
                            list.add(new UserVisitDto(s1, 0L, 1L, 0L));
                        }
                        return list.iterator();
                    }


                    // 支付 (品类ID，0，  0，      1)
                    String[] split = s[10].split(",");
                    List<UserVisitDto> list = new ArrayList<>();
                    for (String s1 : split) {
                        list.add(new UserVisitDto(s1, 0L, 0L, 1L));
                    }
                    return list.iterator();
                })
                // 转换 配对  (品类, (品类，1，  0，      0))
                .mapToPair(value -> new Tuple2<>(value.getProductId(), value))
                // 该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合
                .reduceByKey((userVisitDto, userVisitDto2) -> {
                    userVisitDto.setNumberOrders(userVisitDto.getNumberOrders() + userVisitDto2.getNumberOrders());
                    userVisitDto.setClicks(userVisitDto.getClicks() + userVisitDto2.getClicks());
                    userVisitDto.setNumberPayments(userVisitDto.getNumberPayments() + userVisitDto2.getNumberPayments());
                    return userVisitDto;
                })
                // (品类，1，  0，      0)
                .map(value -> value._2())
                // 排序  需要类实现Comparable
                .sortBy(obj -> obj, true, 2)

                .take(10).forEach(System.out::println);


        sc.close();
    }
}
